1 package org.apache.lucene.index;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.nio.file.DirectoryStream;
23 import java.nio.file.Files;
24 import java.nio.file.Path;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.regex.Pattern;
34
35 import org.apache.lucene.document.Document;
36 import org.apache.lucene.document.Field;
37 import org.apache.lucene.document.LongField;
38 import org.apache.lucene.document.NumericDocValuesField;
39 import org.apache.lucene.search.IndexSearcher;
40 import org.apache.lucene.search.MatchAllDocsQuery;
41 import org.apache.lucene.search.NumericRangeQuery;
42 import org.apache.lucene.search.ScoreDoc;
43 import org.apache.lucene.search.Sort;
44 import org.apache.lucene.search.SortField;
45 import org.apache.lucene.search.TopDocs;
46 import org.apache.lucene.store.Directory;
47 import org.apache.lucene.store.FSDirectory;
48 import org.apache.lucene.store.IOContext;
49 import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
50 import org.apache.lucene.store.MockDirectoryWrapper;
51 import org.apache.lucene.util.Bits;
52 import org.apache.lucene.util.IOUtils;
53 import org.apache.lucene.util.LuceneTestCase;
54 import org.apache.lucene.util.StringHelper;
55 import org.apache.lucene.util.TestUtil;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public class TestDemoParallelLeafReader extends LuceneTestCase {
84
85 static final boolean DEBUG = false;
86
87 static abstract class ReindexingReader implements Closeable {
88
89
90 public final static String SCHEMA_GEN_KEY = "schema_gen";
91
92 public final IndexWriter w;
93 public final ReaderManager mgr;
94
95 private final Directory indexDir;
96 private final Path root;
97 private final Path segsPath;
98
99
100 private final Set<SegmentIDAndGen> closedSegments = Collections.newSetFromMap(new ConcurrentHashMap<SegmentIDAndGen,Boolean>());
101
102
103 private final Map<SegmentIDAndGen,LeafReader> parallelReaders = new ConcurrentHashMap<>();
104
105 void printRefCounts() {
106 System.out.println("All refCounts:");
107 for(Map.Entry<SegmentIDAndGen,LeafReader> ent : parallelReaders.entrySet()) {
108 System.out.println(" " + ent.getKey() + " " + ent.getValue() + " refCount=" + ent.getValue().getRefCount());
109 }
110 }
111
112 public ReindexingReader(Path root) throws IOException {
113 this.root = root;
114
115
116 indexDir = openDirectory(root.resolve("index"));
117
118
119 segsPath = root.resolve("segs");
120 Files.createDirectories(segsPath);
121
122 IndexWriterConfig iwc = getIndexWriterConfig();
123 iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy()));
124 if (DEBUG) {
125 System.out.println("TEST: use IWC:\n" + iwc);
126 }
127 w = new IndexWriter(indexDir, iwc);
128
129 w.getConfig().setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
130 @Override
131 public void warm(LeafReader reader) throws IOException {
132
133
134 if (DEBUG) System.out.println(Thread.currentThread().getName() +": TEST: now warm " + reader);
135
136
137 getParallelLeafReader(reader, false, getCurrentSchemaGen());
138 }
139 });
140
141
142 w.commit();
143 mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w, true)));
144 }
145
146 protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException;
147
148
149 protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) throws IOException {
150 }
151
152
153 protected Directory openDirectory(Path path) throws IOException {
154 return FSDirectory.open(path);
155 }
156
157 public void commit() throws IOException {
158 w.commit();
159 }
160
161 LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException {
162 LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen);
163 if (parallelReader != null) {
164
165
166 assertFalse(parallelReader instanceof ParallelLeafReader);
167 assertFalse(reader instanceof ParallelLeafReader);
168
169
170
171 LeafReader newReader = new ParallelLeafReader(false, parallelReader, reader) {
172 @Override
173 public Bits getLiveDocs() {
174 return getParallelReaders()[1].getLiveDocs();
175 }
176 @Override
177 public int numDocs() {
178 return getParallelReaders()[1].numDocs();
179 }
180 };
181
182
183 parallelReader.decRef();
184
185 return newReader;
186
187 } else {
188
189 return reader;
190 }
191 }
192
193 private class ParallelLeafDirectoryReader extends FilterDirectoryReader {
194 public ParallelLeafDirectoryReader(DirectoryReader in) throws IOException {
195 super(in, new FilterDirectoryReader.SubReaderWrapper() {
196 final long currentSchemaGen = getCurrentSchemaGen();
197 @Override
198 public LeafReader wrap(LeafReader reader) {
199 try {
200 return getCurrentReader(reader, currentSchemaGen);
201 } catch (IOException ioe) {
202
203 throw new RuntimeException(ioe);
204 }
205 }
206 });
207 }
208
209 @Override
210 protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
211 return new ParallelLeafDirectoryReader(in);
212 }
213
214 @Override
215 protected void doClose() throws IOException {
216 Throwable firstExc = null;
217 for (final LeafReader r : getSequentialSubReaders()) {
218 if (r instanceof ParallelLeafReader) {
219
220 try {
221 r.decRef();
222 } catch (Throwable t) {
223 if (firstExc == null) {
224 firstExc = t;
225 }
226 }
227 }
228 }
229
230 try {
231 in.doClose();
232 } catch (Throwable t) {
233 if (firstExc == null) {
234 firstExc = t;
235 }
236 }
237
238 IOUtils.reThrow(firstExc);
239 }
240 }
241
242 @Override
243 public void close() throws IOException {
244 w.close();
245 if (DEBUG) System.out.println("TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir));
246
247
248
249
250
251
252
253
254
255 mgr.close();
256 pruneOldSegments(true);
257 assertNoExtraSegments();
258 indexDir.close();
259 }
260
261
262 private void assertNoExtraSegments() throws IOException {
263 Set<String> liveIDs = new HashSet<String>();
264 for(SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) {
265 String idString = StringHelper.idToString(info.info.getId());
266 liveIDs.add(idString);
267 }
268
269
270 for(SegmentIDAndGen segIDGen : closedSegments) {
271 assertTrue(liveIDs.contains(segIDGen.segID));
272 }
273
274 boolean fail = false;
275 for(Path path : segSubDirs(segsPath)) {
276 SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
277 if (liveIDs.contains(segIDGen.segID) == false) {
278 if (DEBUG) System.out.println("TEST: fail seg=" + path.getFileName() + " is not live but still has a parallel index");
279 fail = true;
280 }
281 }
282 assertFalse(fail);
283 }
284
285 private static class SegmentIDAndGen {
286 public final String segID;
287 public final long schemaGen;
288
289 public SegmentIDAndGen(String segID, long schemaGen) {
290 this.segID = segID;
291 this.schemaGen = schemaGen;
292 }
293
294 public SegmentIDAndGen(String s) {
295 String[] parts = s.split("_");
296 if (parts.length != 2) {
297 throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\"");
298 }
299
300 segID = parts[0];
301 schemaGen = Long.parseLong(parts[1]);
302 }
303
304 @Override
305 public int hashCode() {
306 return (int) (segID.hashCode() * schemaGen);
307 }
308
309 @Override
310 public boolean equals(Object _other) {
311 if (_other instanceof SegmentIDAndGen) {
312 SegmentIDAndGen other = (SegmentIDAndGen) _other;
313 return segID.equals(other.segID) && schemaGen == other.schemaGen;
314 } else {
315 return false;
316 }
317 }
318
319 @Override
320 public String toString() {
321 return segID + "_" + schemaGen;
322 }
323 }
324
325 private class ParallelReaderClosed implements LeafReader.ReaderClosedListener {
326 private final SegmentIDAndGen segIDGen;
327 private final Directory dir;
328
329 public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) {
330 this.segIDGen = segIDGen;
331 this.dir = dir;
332 }
333
334 @Override
335 public void onClose(IndexReader ignored) {
336 try {
337
338 synchronized(ReindexingReader.this) {
339 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now close parallel parLeafReader dir=" + dir + " segIDGen=" + segIDGen);
340 parallelReaders.remove(segIDGen);
341 dir.close();
342 closedSegments.add(segIDGen);
343 }
344 } catch (IOException ioe) {
345 System.out.println("TEST: hit IOExc closing dir=" + dir);
346 ioe.printStackTrace(System.out);
347 throw new RuntimeException(ioe);
348 }
349 }
350 }
351
352
353 LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) throws IOException {
354 assert leaf instanceof SegmentReader;
355 SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info;
356
357 long infoSchemaGen = getSchemaGen(info);
358
359 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: getParallelLeafReader: " + leaf + " infoSchemaGen=" + infoSchemaGen + " vs schemaGen=" + schemaGen + " doCache=" + doCache);
360
361 if (infoSchemaGen == schemaGen) {
362 if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: segment is already current schemaGen=" + schemaGen + "; skipping");
363 return null;
364 }
365
366 if (infoSchemaGen > schemaGen) {
367 throw new IllegalStateException("segment infoSchemaGen (" + infoSchemaGen + ") cannot be greater than requested schemaGen (" + schemaGen + ")");
368 }
369
370 final SegmentIDAndGen segIDGen = new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen);
371
372
373 while (true) {
374
375
376 synchronized (this) {
377 LeafReader parReader = parallelReaders.get(segIDGen);
378
379 assert doCache || parReader == null;
380
381 if (parReader == null) {
382
383 Path leafIndex = segsPath.resolve(segIDGen.toString());
384
385 final Directory dir = openDirectory(leafIndex);
386
387 if (Files.exists(leafIndex.resolve("done")) == false) {
388 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: build segment index for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
389
390 if (dir.listAll().length != 0) {
391
392 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: remove old incomplete index files: " + leafIndex);
393 IOUtils.rm(leafIndex);
394 }
395
396 reindex(infoSchemaGen, schemaGen, leaf, dir);
397
398
399
400 dir.createOutput("done", IOContext.DEFAULT).close();
401 } else {
402 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: segment index already exists for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
403 }
404
405 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check index " + dir);
406
407
408 SegmentInfos infos = SegmentInfos.readLatestCommit(dir);
409 final LeafReader parLeafReader;
410 if (infos.size() == 1) {
411 parLeafReader = new SegmentReader(infos.info(0), IOContext.DEFAULT);
412 } else {
413
414 parLeafReader = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir));
415 }
416
417
418
419 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: opened parallel reader: " + parLeafReader);
420 if (doCache) {
421 parallelReaders.put(segIDGen, parLeafReader);
422
423
424
425 closedSegments.remove(segIDGen);
426
427 parLeafReader.addReaderClosedListener(new ParallelReaderClosed(segIDGen, dir));
428
429 } else {
430
431
432 if (DEBUG) System.out.println("TEST: now decRef non cached refCount=" + parLeafReader.getRefCount());
433 parLeafReader.decRef();
434 dir.close();
435
436
437
438 closedSegments.add(segIDGen);
439 parReader = null;
440 }
441 parReader = parLeafReader;
442
443 } else {
444 if (parReader.tryIncRef() == false) {
445
446
447 if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: tryIncRef failed for " + parReader + "; retry");
448 parReader = null;
449 continue;
450 }
451 if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: use existing already opened parReader=" + parReader + " refCount=" + parReader.getRefCount());
452
453 }
454
455
456 return parReader;
457 }
458 }
459 }
460
461
462 protected abstract void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException;
463
464
465 protected abstract long getCurrentSchemaGen();
466
467
468 protected long getMergingSchemaGen() {
469 return getCurrentSchemaGen();
470 }
471
472
473
474
475 private void pruneOldSegments(boolean removeOldGens) throws IOException {
476 SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir);
477 if (DEBUG) System.out.println("TEST: prune");
478
479 Set<String> liveIDs = new HashSet<String>();
480 for(SegmentCommitInfo info : lastCommit) {
481 String idString = StringHelper.idToString(info.info.getId());
482 liveIDs.add(idString);
483 }
484
485 long currentSchemaGen = getCurrentSchemaGen();
486
487 if (Files.exists(segsPath)) {
488 for (Path path : segSubDirs(segsPath)) {
489 if (Files.isDirectory(path)) {
490 SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
491 assert segIDGen.schemaGen <= currentSchemaGen;
492 if (liveIDs.contains(segIDGen.segID) == false && (closedSegments.contains(segIDGen) || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) {
493 if (DEBUG) System.out.println("TEST: remove " + segIDGen);
494 try {
495 IOUtils.rm(path);
496 closedSegments.remove(segIDGen);
497 } catch (IOException ioe) {
498
499 if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe);
500 }
501 }
502 }
503 }
504 }
505 }
506
507
508 private class ReindexingMergePolicy extends MergePolicy {
509
510 class ReindexingOneMerge extends OneMerge {
511
512 List<LeafReader> parallelReaders;
513 final long schemaGen;
514
515 ReindexingOneMerge(List<SegmentCommitInfo> segments) {
516 super(segments);
517
518 schemaGen = getMergingSchemaGen();
519 long currentSchemaGen = getCurrentSchemaGen();
520
521
522 if (schemaGen > currentSchemaGen) {
523 throw new IllegalStateException("currentSchemaGen (" + currentSchemaGen + ") must always be >= mergingSchemaGen (" + schemaGen + ")");
524 }
525 }
526
527 @Override
528 public List<CodecReader> getMergeReaders() throws IOException {
529 if (parallelReaders == null) {
530 parallelReaders = new ArrayList<>();
531 for (CodecReader reader : super.getMergeReaders()) {
532 parallelReaders.add(getCurrentReader((SegmentReader)reader, schemaGen));
533 }
534 }
535
536
537 List<CodecReader> mergeReaders = new ArrayList<>();
538 for (LeafReader reader : parallelReaders) {
539 mergeReaders.add(SlowCodecReaderWrapper.wrap(reader));
540 }
541 return mergeReaders;
542 }
543
544 @Override
545 public void mergeFinished() throws IOException {
546 Throwable th = null;
547 for(LeafReader r : parallelReaders) {
548 if (r instanceof ParallelLeafReader) {
549 try {
550 r.decRef();
551 } catch (Throwable t) {
552 if (th == null) {
553 th = t;
554 }
555 }
556 }
557 }
558
559
560 IOUtils.reThrow(th);
561 }
562
563 @Override
564 public void setMergeInfo(SegmentCommitInfo info) {
565
566 info.info.getDiagnostics().put(SCHEMA_GEN_KEY, Long.toString(schemaGen));
567 super.setMergeInfo(info);
568 }
569
570 @Override
571 public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
572 return super.getDocMap(mergeState);
573 }
574 }
575
576 class ReindexingMergeSpecification extends MergeSpecification {
577 @Override
578 public void add(OneMerge merge) {
579 super.add(new ReindexingOneMerge(merge.segments));
580 }
581
582 @Override
583 public String segString(Directory dir) {
584 return "ReindexingMergeSpec(" + super.segString(dir) + ")";
585 }
586 }
587
588 MergeSpecification wrap(MergeSpecification spec) {
589 MergeSpecification wrapped = null;
590 if (spec != null) {
591 wrapped = new ReindexingMergeSpecification();
592 for (OneMerge merge : spec.merges) {
593 wrapped.add(merge);
594 }
595 }
596 return wrapped;
597 }
598
599 final MergePolicy in;
600
601
602 public ReindexingMergePolicy(MergePolicy in) {
603 this.in = in;
604 }
605
606 @Override
607 public MergeSpecification findMerges(MergeTrigger mergeTrigger,
608 SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
609 return wrap(in.findMerges(mergeTrigger, segmentInfos, writer));
610 }
611
612 @Override
613 public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
614 int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
615 throws IOException {
616
617 return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
618 }
619
620 @Override
621 public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
622 throws IOException {
623 return wrap(in.findForcedDeletesMerges(segmentInfos, writer));
624 }
625
626 @Override
627 public boolean useCompoundFile(SegmentInfos segments,
628 SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
629 return in.useCompoundFile(segments, newSegment, writer);
630 }
631
632 @Override
633 public String toString() {
634 return "ReindexingMergePolicy(" + in + ")";
635 }
636 }
637
638 static long getSchemaGen(SegmentInfo info) {
639 String s = info.getDiagnostics().get(SCHEMA_GEN_KEY);
640 if (s == null) {
641 return -1;
642 } else {
643 return Long.parseLong(s);
644 }
645 }
646 }
647
648 private ReindexingReader getReindexer(Path root) throws IOException {
649 return new ReindexingReader(root) {
650 @Override
651 protected IndexWriterConfig getIndexWriterConfig() throws IOException {
652 IndexWriterConfig iwc = newIndexWriterConfig();
653 TieredMergePolicy tmp = new TieredMergePolicy();
654
655 tmp.setFloorSegmentMB(.01);
656 iwc.setMergePolicy(tmp);
657 return iwc;
658 }
659
660 @Override
661 protected Directory openDirectory(Path path) throws IOException {
662 MockDirectoryWrapper dir = newMockFSDirectory(path);
663 dir.setUseSlowOpenClosers(false);
664 dir.setThrottling(Throttling.NEVER);
665 return dir;
666 }
667
668 @Override
669 protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
670 IndexWriterConfig iwc = newIndexWriterConfig();
671
672
673 iwc.setMergePolicy(new LogByteSizeMergePolicy());
674 IndexWriter w = new IndexWriter(parallelDir, iwc);
675 int maxDoc = reader.maxDoc();
676
677
678 for(int i=0;i<maxDoc;i++) {
679
680 Document oldDoc = reader.document(i);
681 Document newDoc = new Document();
682 long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
683 newDoc.add(new NumericDocValuesField("number", value));
684 newDoc.add(new LongField("number", value, Field.Store.NO));
685 w.addDocument(newDoc);
686 }
687
688 if (random().nextBoolean()) {
689 w.forceMerge(1);
690 }
691
692 w.close();
693 }
694
695 @Override
696 protected long getCurrentSchemaGen() {
697 return 0;
698 }
699 };
700 }
701
702
703 private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) throws IOException {
704 return new ReindexingReader(root) {
705 @Override
706 protected IndexWriterConfig getIndexWriterConfig() throws IOException {
707 IndexWriterConfig iwc = newIndexWriterConfig();
708 TieredMergePolicy tmp = new TieredMergePolicy();
709
710 tmp.setFloorSegmentMB(.01);
711 iwc.setMergePolicy(tmp);
712 return iwc;
713 }
714
715 @Override
716 protected Directory openDirectory(Path path) throws IOException {
717 MockDirectoryWrapper dir = newMockFSDirectory(path);
718 dir.setUseSlowOpenClosers(false);
719 dir.setThrottling(Throttling.NEVER);
720 return dir;
721 }
722
723 @Override
724 protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
725 IndexWriterConfig iwc = newIndexWriterConfig();
726
727
728 iwc.setMergePolicy(new LogByteSizeMergePolicy());
729 IndexWriter w = new IndexWriter(parallelDir, iwc);
730 int maxDoc = reader.maxDoc();
731
732 if (oldSchemaGen <= 0) {
733
734 for(int i=0;i<maxDoc;i++) {
735
736 Document oldDoc = reader.document(i);
737 Document newDoc = new Document();
738 long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
739 newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, value));
740 newDoc.add(new LongField("number", value, Field.Store.NO));
741 w.addDocument(newDoc);
742 }
743 } else {
744
745 NumericDocValues oldValues = reader.getNumericDocValues("number_" + oldSchemaGen);
746 assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
747 for(int i=0;i<maxDoc;i++) {
748
749 Document oldDoc = reader.document(i);
750 Document newDoc = new Document();
751 newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, oldValues.get(i)));
752 w.addDocument(newDoc);
753 }
754 }
755
756 if (random().nextBoolean()) {
757 w.forceMerge(1);
758 }
759
760 w.close();
761 }
762
763 @Override
764 protected long getCurrentSchemaGen() {
765 return currentSchemaGen.get();
766 }
767
768 @Override
769 protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
770 String fieldName = "number_" + schemaGen;
771 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs field=" + fieldName + " r=" + r + " parR=" + parR);
772 NumericDocValues numbers = parR.getNumericDocValues(fieldName);
773 if (numbers == null) {
774 return;
775 }
776 int maxDoc = r.maxDoc();
777 boolean failed = false;
778 for(int i=0;i<maxDoc;i++) {
779 Document oldDoc = r.document(i);
780 long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
781 if (value != numbers.get(i)) {
782 if (DEBUG) System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
783 failed = true;
784 } else if (failed) {
785 if (DEBUG) System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
786 }
787 }
788 assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
789 }
790 };
791 }
792
793
794 private ReindexingReader getReindexerSameDVField(Path root, final AtomicLong currentSchemaGen, final AtomicLong mergingSchemaGen) throws IOException {
795 return new ReindexingReader(root) {
796 @Override
797 protected IndexWriterConfig getIndexWriterConfig() throws IOException {
798 IndexWriterConfig iwc = newIndexWriterConfig();
799 TieredMergePolicy tmp = new TieredMergePolicy();
800
801 tmp.setFloorSegmentMB(.01);
802 iwc.setMergePolicy(tmp);
803 if (TEST_NIGHTLY) {
804
805 iwc.setUseCompoundFile(true);
806 }
807 return iwc;
808 }
809
810 @Override
811 protected Directory openDirectory(Path path) throws IOException {
812 MockDirectoryWrapper dir = newMockFSDirectory(path);
813 dir.setUseSlowOpenClosers(false);
814 dir.setThrottling(Throttling.NEVER);
815 return dir;
816 }
817
818 @Override
819 protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
820 IndexWriterConfig iwc = newIndexWriterConfig();
821
822
823 iwc.setMergePolicy(new LogByteSizeMergePolicy());
824 IndexWriter w = new IndexWriter(parallelDir, iwc);
825 int maxDoc = reader.maxDoc();
826
827 if (oldSchemaGen <= 0) {
828
829 for(int i=0;i<maxDoc;i++) {
830
831 Document oldDoc = reader.document(i);
832 Document newDoc = new Document();
833 long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
834 newDoc.add(new NumericDocValuesField("number", newSchemaGen*value));
835 newDoc.add(new LongField("number", value, Field.Store.NO));
836 w.addDocument(newDoc);
837 }
838 } else {
839
840 NumericDocValues oldValues = reader.getNumericDocValues("number");
841 assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
842 for(int i=0;i<maxDoc;i++) {
843
844 Document oldDoc = reader.document(i);
845 Document newDoc = new Document();
846 newDoc.add(new NumericDocValuesField("number", newSchemaGen*(oldValues.get(i)/oldSchemaGen)));
847 w.addDocument(newDoc);
848 }
849 }
850
851 if (random().nextBoolean()) {
852 w.forceMerge(1);
853 }
854
855 w.close();
856 }
857
858 @Override
859 protected long getCurrentSchemaGen() {
860 return currentSchemaGen.get();
861 }
862
863 @Override
864 protected long getMergingSchemaGen() {
865 return mergingSchemaGen.get();
866 }
867
868 @Override
869 protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
870 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs r=" + r + " parR=" + parR);
871 NumericDocValues numbers = parR.getNumericDocValues("numbers");
872 if (numbers == null) {
873 return;
874 }
875 int maxDoc = r.maxDoc();
876 boolean failed = false;
877 for(int i=0;i<maxDoc;i++) {
878 Document oldDoc = r.document(i);
879 long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
880 value *= schemaGen;
881 if (value != numbers.get(i)) {
882 System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
883 failed = true;
884 } else if (failed) {
885 System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
886 }
887 }
888 assertFalse("FAILED r=" + r, failed);
889 }
890 };
891 }
892
893 public void testBasicMultipleSchemaGens() throws Exception {
894
895 AtomicLong currentSchemaGen = new AtomicLong();
896
897
898 ReindexingReader reindexer = getReindexerNewDVFields(createTempDir(), currentSchemaGen);
899 reindexer.commit();
900
901 Document doc = new Document();
902 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
903 reindexer.w.addDocument(doc);
904
905 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: refresh @ 1 doc");
906 reindexer.mgr.maybeRefresh();
907 DirectoryReader r = reindexer.mgr.acquire();
908 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: got reader=" + r);
909 try {
910 checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
911 } finally {
912 reindexer.mgr.release(r);
913 }
914
915
916 currentSchemaGen.incrementAndGet();
917
918 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: increment schemaGen");
919 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: commit");
920 reindexer.commit();
921
922 doc = new Document();
923 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
924 reindexer.w.addDocument(doc);
925
926 if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
927 reindexer.mgr.maybeRefresh();
928
929 r = reindexer.mgr.acquire();
930 if (DEBUG) System.out.println("TEST: got reader=" + r);
931 try {
932 checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
933 } finally {
934 reindexer.mgr.release(r);
935 }
936
937 if (DEBUG) System.out.println("TEST: forceMerge");
938 reindexer.w.forceMerge(1);
939
940 currentSchemaGen.incrementAndGet();
941
942 if (DEBUG) System.out.println("TEST: commit");
943 reindexer.commit();
944
945 if (DEBUG) System.out.println("TEST: refresh after forceMerge");
946 reindexer.mgr.maybeRefresh();
947 r = reindexer.mgr.acquire();
948 if (DEBUG) System.out.println("TEST: got reader=" + r);
949 try {
950 checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
951 } finally {
952 reindexer.mgr.release(r);
953 }
954
955 if (DEBUG) System.out.println("TEST: close writer");
956 reindexer.close();
957 }
958
959 public void testRandomMultipleSchemaGens() throws Exception {
960
961 AtomicLong currentSchemaGen = new AtomicLong();
962 ReindexingReader reindexer = null;
963
964
965 int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
966 int maxID = 0;
967 Path root = createTempDir();
968 int refreshEveryNumDocs = 100;
969 int commitCloseNumDocs = 1000;
970 for(int i=0;i<numDocs;i++) {
971 if (reindexer == null) {
972 reindexer = getReindexerNewDVFields(root, currentSchemaGen);
973 }
974
975 Document doc = new Document();
976 String id;
977 String updateID;
978 if (maxID > 0 && random().nextInt(10) == 7) {
979
980 id = "" + random().nextInt(maxID);
981 updateID = id;
982 } else {
983 id = "" + (maxID++);
984 updateID = null;
985 }
986
987 doc.add(newStringField("id", id, Field.Store.NO));
988 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
989 if (updateID == null) {
990 reindexer.w.addDocument(doc);
991 } else {
992 reindexer.w.updateDocument(new Term("id", updateID), doc);
993 }
994 if (random().nextInt(refreshEveryNumDocs) == 17) {
995 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
996 reindexer.mgr.maybeRefresh();
997
998 DirectoryReader r = reindexer.mgr.acquire();
999 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
1000 try {
1001 checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
1002 } finally {
1003 reindexer.mgr.release(r);
1004 }
1005 if (DEBUG) reindexer.printRefCounts();
1006 refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1007 }
1008
1009 if (random().nextInt(500) == 17) {
1010 currentSchemaGen.incrementAndGet();
1011 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
1012 }
1013
1014 if (i > 0 && random().nextInt(10) == 7) {
1015
1016 reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1017 }
1018
1019 if (random().nextInt(commitCloseNumDocs) == 17) {
1020 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
1021 reindexer.commit();
1022
1023 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1024 }
1025
1026
1027 if (random().nextInt(commitCloseNumDocs) == 17) {
1028 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
1029 reindexer.close();
1030 reindexer = null;
1031 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1032 }
1033 }
1034
1035 if (reindexer != null) {
1036 reindexer.close();
1037 }
1038 }
1039
1040
1041
1042 public void testRandomMultipleSchemaGensSameField() throws Exception {
1043
1044 AtomicLong currentSchemaGen = new AtomicLong();
1045 AtomicLong mergingSchemaGen = new AtomicLong();
1046
1047 ReindexingReader reindexer = null;
1048
1049
1050 int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
1051 int maxID = 0;
1052 Path root = createTempDir();
1053 int refreshEveryNumDocs = 100;
1054 int commitCloseNumDocs = 1000;
1055
1056 for(int i=0;i<numDocs;i++) {
1057 if (reindexer == null) {
1058 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: open new reader/writer");
1059 reindexer = getReindexerSameDVField(root, currentSchemaGen, mergingSchemaGen);
1060 }
1061
1062 Document doc = new Document();
1063 String id;
1064 String updateID;
1065 if (maxID > 0 && random().nextInt(10) == 7) {
1066
1067 id = "" + random().nextInt(maxID);
1068 updateID = id;
1069 } else {
1070 id = "" + (maxID++);
1071 updateID = null;
1072 }
1073
1074 doc.add(newStringField("id", id, Field.Store.NO));
1075 doc.add(newTextField("text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES));
1076 if (updateID == null) {
1077 reindexer.w.addDocument(doc);
1078 } else {
1079 reindexer.w.updateDocument(new Term("id", updateID), doc);
1080 }
1081 if (random().nextInt(refreshEveryNumDocs) == 17) {
1082 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
1083 reindexer.mgr.maybeRefresh();
1084 DirectoryReader r = reindexer.mgr.acquire();
1085 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
1086 try {
1087 checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get());
1088 } finally {
1089 reindexer.mgr.release(r);
1090 }
1091 if (DEBUG) reindexer.printRefCounts();
1092 refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1093 }
1094
1095 if (random().nextInt(500) == 17) {
1096 currentSchemaGen.incrementAndGet();
1097 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
1098 if (random().nextBoolean()) {
1099 mergingSchemaGen.incrementAndGet();
1100 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance mergingSchemaGen to " + mergingSchemaGen);
1101 }
1102 }
1103
1104 if (i > 0 && random().nextInt(10) == 7) {
1105
1106 reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1107 }
1108
1109 if (random().nextInt(commitCloseNumDocs) == 17) {
1110 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
1111 reindexer.commit();
1112
1113 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1114 }
1115
1116
1117 if (random().nextInt(commitCloseNumDocs) == 17) {
1118 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
1119 reindexer.close();
1120 reindexer = null;
1121 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1122 }
1123 }
1124
1125 if (reindexer != null) {
1126 reindexer.close();
1127 }
1128
1129
1130 try (Directory dir = newFSDirectory(root.resolve("index"));
1131 IndexReader r = DirectoryReader.open(dir)) {
1132 for (LeafReaderContext ctx : r.leaves()) {
1133 LeafReader leaf = ctx.reader();
1134 NumericDocValues numbers = leaf.getNumericDocValues("number");
1135 if (numbers != null) {
1136 int maxDoc = leaf.maxDoc();
1137 for(int i=0;i<maxDoc;i++) {
1138 Document doc = leaf.document(i);
1139 long value = Long.parseLong(doc.get("text").split(" ")[1]);
1140 long dvValue = numbers.get(i);
1141 if (value == 0) {
1142 assertEquals(0, dvValue);
1143 } else {
1144 assertTrue(dvValue % value == 0);
1145 assertTrue(dvValue / value <= mergingSchemaGen.get());
1146 }
1147 }
1148 }
1149 }
1150 }
1151 }
1152
1153 public void testBasic() throws Exception {
1154 ReindexingReader reindexer = getReindexer(createTempDir());
1155
1156
1157 reindexer.commit();
1158
1159 Document doc = new Document();
1160 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1161 reindexer.w.addDocument(doc);
1162
1163 if (DEBUG) System.out.println("TEST: refresh @ 1 doc");
1164 reindexer.mgr.maybeRefresh();
1165 DirectoryReader r = reindexer.mgr.acquire();
1166 if (DEBUG) System.out.println("TEST: got reader=" + r);
1167 try {
1168 checkAllNumberDVs(r);
1169 IndexSearcher s = newSearcher(r);
1170 testNumericDVSort(s);
1171 testNumericRangeQuery(s);
1172 } finally {
1173 reindexer.mgr.release(r);
1174 }
1175
1176
1177 if (DEBUG) System.out.println("TEST: commit");
1178 reindexer.commit();
1179
1180 doc = new Document();
1181 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1182 reindexer.w.addDocument(doc);
1183
1184 if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
1185 reindexer.mgr.maybeRefresh();
1186
1187 r = reindexer.mgr.acquire();
1188 if (DEBUG) System.out.println("TEST: got reader=" + r);
1189 try {
1190 checkAllNumberDVs(r);
1191 IndexSearcher s = newSearcher(r);
1192 testNumericDVSort(s);
1193 testNumericRangeQuery(s);
1194 } finally {
1195 reindexer.mgr.release(r);
1196 }
1197
1198 if (DEBUG) System.out.println("TEST: forceMerge");
1199 reindexer.w.forceMerge(1);
1200
1201 if (DEBUG) System.out.println("TEST: commit");
1202 reindexer.commit();
1203
1204 if (DEBUG) System.out.println("TEST: refresh after forceMerge");
1205 reindexer.mgr.maybeRefresh();
1206 r = reindexer.mgr.acquire();
1207 if (DEBUG) System.out.println("TEST: got reader=" + r);
1208 try {
1209 checkAllNumberDVs(r);
1210 IndexSearcher s = newSearcher(r);
1211 testNumericDVSort(s);
1212 testNumericRangeQuery(s);
1213 } finally {
1214 reindexer.mgr.release(r);
1215 }
1216
1217 if (DEBUG) System.out.println("TEST: close writer");
1218 reindexer.close();
1219 }
1220
1221 public void testRandom() throws Exception {
1222 Path root = createTempDir();
1223 ReindexingReader reindexer = null;
1224
1225
1226 int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
1227 int maxID = 0;
1228 int refreshEveryNumDocs = 100;
1229 int commitCloseNumDocs = 1000;
1230 for(int i=0;i<numDocs;i++) {
1231 if (reindexer == null) {
1232 reindexer = getReindexer(root);
1233 }
1234
1235 Document doc = new Document();
1236 String id;
1237 String updateID;
1238 if (maxID > 0 && random().nextInt(10) == 7) {
1239
1240 id = "" + random().nextInt(maxID);
1241 updateID = id;
1242 } else {
1243 id = "" + (maxID++);
1244 updateID = null;
1245 }
1246
1247 doc.add(newStringField("id", id, Field.Store.NO));
1248 doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1249 if (updateID == null) {
1250 reindexer.w.addDocument(doc);
1251 } else {
1252 reindexer.w.updateDocument(new Term("id", updateID), doc);
1253 }
1254
1255 if (random().nextInt(refreshEveryNumDocs) == 17) {
1256 if (DEBUG) System.out.println("TEST: refresh @ " + (i+1) + " docs");
1257 reindexer.mgr.maybeRefresh();
1258 DirectoryReader r = reindexer.mgr.acquire();
1259 if (DEBUG) System.out.println("TEST: got reader=" + r);
1260 try {
1261 checkAllNumberDVs(r);
1262 IndexSearcher s = newSearcher(r);
1263 testNumericDVSort(s);
1264 testNumericRangeQuery(s);
1265 } finally {
1266 reindexer.mgr.release(r);
1267 }
1268 refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1269 }
1270
1271 if (i > 0 && random().nextInt(10) == 7) {
1272
1273 reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1274 }
1275
1276 if (random().nextInt(commitCloseNumDocs) == 17) {
1277 if (DEBUG) System.out.println("TEST: commit @ " + (i+1) + " docs");
1278 reindexer.commit();
1279 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1280 }
1281
1282
1283 if (random().nextInt(commitCloseNumDocs) == 17) {
1284 if (DEBUG) System.out.println("TEST: close writer @ " + (i+1) + " docs");
1285 reindexer.close();
1286 reindexer = null;
1287 commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1288 }
1289 }
1290 if (reindexer != null) {
1291 reindexer.close();
1292 }
1293 }
1294
1295 private static void checkAllNumberDVs(IndexReader r) throws IOException {
1296 checkAllNumberDVs(r, "number", true, 1);
1297 }
1298
1299 private static void checkAllNumberDVs(IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException {
1300 NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName);
1301 int maxDoc = r.maxDoc();
1302 boolean failed = false;
1303 long t0 = System.currentTimeMillis();
1304 for(int i=0;i<maxDoc;i++) {
1305 Document oldDoc = r.document(i);
1306 long value = multiplier * Long.parseLong(oldDoc.get("text").split(" ")[1]);
1307 if (value != numbers.get(i)) {
1308 System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
1309 failed = true;
1310 } else if (failed) {
1311 System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
1312 }
1313 }
1314 if (failed) {
1315 if (r instanceof LeafReader == false) {
1316 System.out.println("TEST FAILED; check leaves");
1317 for(LeafReaderContext ctx : r.leaves()) {
1318 System.out.println("CHECK LEAF=" + ctx.reader());
1319 checkAllNumberDVs(ctx.reader(), fieldName, false, 1);
1320 }
1321 }
1322 if (doThrow) {
1323 assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
1324 } else {
1325 System.out.println("FAILED field=" + fieldName + " r=" + r);
1326 }
1327 }
1328 }
1329
1330 private static void testNumericDVSort(IndexSearcher s) throws IOException {
1331
1332 TopDocs hits = s.search(new MatchAllDocsQuery(), 100, new Sort(new SortField("number", SortField.Type.LONG)));
1333 NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
1334 long last = Long.MIN_VALUE;
1335 for(ScoreDoc scoreDoc : hits.scoreDocs) {
1336 long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
1337 assertTrue(value >= last);
1338 assertEquals(value, numbers.get(scoreDoc.doc));
1339 last = value;
1340 }
1341 }
1342
1343 private static void testNumericRangeQuery(IndexSearcher s) throws IOException {
1344 NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
1345 for(int i=0;i<100;i++) {
1346
1347 long min = random().nextLong();
1348 long max = random().nextLong();
1349 if (min > max) {
1350 long x = min;
1351 min = max;
1352 max = x;
1353 }
1354
1355 TopDocs hits = s.search(NumericRangeQuery.newLongRange("number", min, max, true, true), 100);
1356 for(ScoreDoc scoreDoc : hits.scoreDocs) {
1357 long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
1358 assertTrue(value >= min);
1359 assertTrue(value <= max);
1360 assertEquals(value, numbers.get(scoreDoc.doc));
1361 }
1362 }
1363 }
1364
1365
1366 static final Pattern SEG_GEN_SUB_DIR_PATTERN = Pattern.compile("^[a-z0-9]+_([0-9]+)$");
1367
1368 private static List<Path> segSubDirs(Path segsPath) throws IOException {
1369 List<Path> result = new ArrayList<>();
1370 try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) {
1371 for (Path path : stream) {
1372
1373 if (Files.isDirectory(path) && SEG_GEN_SUB_DIR_PATTERN.matcher(path.getFileName().toString()).matches()) {
1374 result.add(path);
1375 }
1376 }
1377 }
1378
1379 return result;
1380 }
1381
1382
1383 }